这节内容我们先来研究一下 server 端连接建立过程。

    先放上 server 端的 main 函数。

    我们发现其实 server 端连接的建立主要包括三步:

    (1)创建 server

    (2)server 的注册

    (3)调用 Serve 监听端口并处理请求

    ok,弄清楚主流程之后下面我们进入每个步骤里面去看一下代码实现。

    1、创建 server

    server 的创建比较简单,其实就下面一个方法:

    1. opts := defaultServerOptions
    2. for _, o := range opt {
    3. o.apply(&opts)
    4. }
    5. s := &Server{
    6. lis: make(map[net.Listener]bool),
    7. opts: opts,
    8. conns: make(map[transport.ServerTransport]bool),
    9. m: make(map[string]*service),
    10. quit: grpcsync.NewEvent(),
    11. done: grpcsync.NewEvent(),
    12. czData: new(channelzData),
    13. }
    14. s.cv = sync.NewCond(&s.mu)
    15. if EnableTracing {
    16. _, file, line, _ := runtime.Caller(1)
    17. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
    18. }
    19. if channelz.IsOn() {
    20. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
    21. }
    22. return s
    23. }
    1. // Server is a gRPC server to serve RPC requests.
    2. type Server struct {
    3. // serverOptions 就是描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,跟 http Headers 类似,我们这里就暂时先不管
    4. opts serverOptions
    5. // 一个互斥锁
    6. mu sync.Mutex // guards following
    7. // listener map
    8. lis map[net.Listener]bool
    9. // connections map
    10. conns map[transport.ServerTransport]bool
    11. // server 是否在处理请求的一个状态位
    12. serve bool
    13. drain bool
    14. cv *sync.Cond // signaled when connections close for GracefulStop
    15. // service map
    16. m map[string]*service // service name -> service info
    17. events trace.EventLog
    18. quit *grpcsync.Event
    19. done *grpcsync.Event
    20. channelzRemoveOnce sync.Once
    21. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
    22. channelzID int64 // channelz unique identification number
    23. czData *channelzData
    24. }

    虽然 server 结构体里面各种乱起八糟的字段,但是我们可以先不管哈哈哈,比较重要的无非就是三个 map 表分别用来存放多个 listener 、connection 和 service。其他字段都是为了实现协议描述或者并发控制的功能。我们重点关注下

    1. m map[string]*service

    这个结构,service 中主要包含了 MethodDesc 和 StreamDesc 这两个 map

    1. type service struct {
    2. server interface{} // the server for service methods
    3. md map[string]*MethodDesc
    4. sd map[string]*StreamDesc
    5. mdata interface{}
    6. }

    2、server 注册

    server 的注册调用了 RegisterGreeterServer 方法,这个方法是 pb.go 文件里面的,如下:

    这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :

    1. var _Greeter_serviceDesc = grpc.ServiceDesc{
    2. ServiceName: "helloworld.Greeter",
    3. HandlerType: (*GreeterServer)(nil),
    4. Methods: []grpc.MethodDesc{
    5. {
    6. MethodName: "SayHello",
    7. Handler: _Greeter_SayHello_Handler,
    8. },
    9. {
    10. MethodName: "SayHelloAgain",
    11. Handler: _Greeter_SayHelloAgain_Handler,
    12. },
    13. },
    14. Metadata: "helloworld.proto",
    15. }

    我们来看看 RegisterService 这个方法,可以看到主要是调用了 register 方法,register 方法则按照方法名为 key,将方法注入到 server 的 service map 中。看到这里我们其实可以预测一下,server 不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理

    1. ht := reflect.TypeOf(sd.HandlerType).Elem()
    2. st := reflect.TypeOf(ss)
    3. if !st.Implements(ht) {
    4. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    5. }
    6. s.register(sd, ss)
    7. }
    8. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    9. s.mu.Lock()
    10. defer s.mu.Unlock()
    11. s.printf("RegisterService(%q)", sd.ServiceName)
    12. if s.serve {
    13. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    14. }
    15. if _, ok := s.m[sd.ServiceName]; ok {
    16. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    17. }
    18. srv := &service{
    19. server: ss,
    20. md: make(map[string]*MethodDesc),
    21. sd: make(map[string]*StreamDesc),
    22. mdata: sd.Metadata,
    23. }
    24. for i := range sd.Methods {
    25. d := &sd.Methods[i]
    26. srv.md[d.MethodName] = d
    27. }
    28. for i := range sd.Streams {
    29. d := &sd.Streams[i]
    30. srv.sd[d.StreamName] = d
    31. }
    32. s.m[sd.ServiceName] = srv
    33. }

    3、Serve 过程

    回想所有 C/S 模式下,client 和 server 的通信基本是类似的。大致过程无非是 server 通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求类型和请求参数,调用不同的 handler 进行处理,回写响应数据。

    所以,对 server 端来说,主要是了解其如何实现监听,如何为请求分配不同的 handler 和 回写响应数据。

    1. for {
    2. rawConn, err := lis.Accept()
    3. ......
    4. s.serveWG.Add(1)
    5. go func() {
    6. s.handleRawConn(rawConn)
    7. s.serveWG.Done()
    8. }()
    9. }

    ok,我们已经看到了监听过程,server 的监听果然是通过一个死循环 调用了 lis.Accept() 进行端口监听。

    继续往下看,我们发现新起协程调用了 handleRawConn 这个方法,为了节约篇幅,我们直接看重点代码,如下:

    1. func (s *Server) handleRawConn(rawConn net.Conn) {
    2. ...
    3. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    4. ...
    5. // Finish handshaking (HTTP2)
    6. st := s.newHTTP2Transport(conn, authInfo)
    7. if st == nil {
    8. return
    9. }
    10. ...
    11. go func() {
    12. s.serveStreams(st)
    13. s.removeConn(st)
    14. }()
    15. }

    可以看到 handleRawConn 里面实现了 http 的 handshake,还记得之前我们说过,grpc 是基于 http2 实现的吗?这里是不是实锤了……. 发现又通过一个新的协程调用了 serveStreams 这个方法,这个方法干了啥呢?

    其实它主要调用了 handleStream ,继续跟进 handleStream 方法,我们发现了重要线索,如下(省略了部分无关代码)

    1. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    2. sm := stream.Method()
    3. ...
    4. service := sm[:pos]
    5. method := sm[pos+1:]
    6. srv, knownService := s.m[service]
    7. if md, ok := srv.md[method]; ok {
    8. s.processUnaryRPC(t, stream, srv, md, trInfo)
    9. return
    10. }
    11. if sd, ok := srv.sd[method]; ok {
    12. s.processStreamingRPC(t, stream, srv, sd, trInfo)
    13. return
    14. }
    15. }
    16. ...
    17. }

    重要线索就是这一行

    1. srv, knownService := s.m[service]

    还记得我们之前的预测吗?根据 serviceName 去 server 中的 service map,也就是 m 这个字段,里面去取出 handler 进行处理。我们 hello world 这个 demo 的请求不涉及到 stream ,所以直接取出 handler ,然后传给 processUnaryRPC 这个方法进行处理。

    1. if md, ok := srv.md[method]; ok {
    2. s.processUnaryRPC(t, stream, srv, md, trInfo)
    3. return
    4. }

    再来看看 processUnaryRpc 这个方法

    1. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
    2. ...
    3. sh := s.opts.statsHandler
    4. if sh != nil {
    5. beginTime := time.Now()
    6. begin := &stats.Begin{
    7. BeginTime: beginTime,
    8. }
    9. sh.HandleRPC(stream.Context(), begin)
    10. defer func() {
    11. end := &stats.End{
    12. BeginTime: beginTime,
    13. EndTime: time.Now(),
    14. }
    15. if err != nil && err != io.EOF {
    16. end.Error = toRPCErr(err)
    17. }
    18. sh.HandleRPC(stream.Context(), end)
    19. }()
    20. }
    21. ...
    22. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
    23. if err == io.EOF {
    24. // The entire stream is done (for unary RPC only).
    25. return err
    26. }
    27. if s, ok := status.FromError(err); ok {
    28. if e := t.WriteStatus(stream, s); e != nil {
    29. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
    30. }
    31. } else {
    32. switch st := err.(type) {
    33. case transport.ConnectionError:
    34. // Nothing to do here.
    35. default:
    36. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
    37. }
    38. }
    39. if binlog != nil {
    40. h, _ := stream.Header()
    41. binlog.Log(&binarylog.ServerHeader{
    42. Header: h,
    43. })
    44. binlog.Log(&binarylog.ServerTrailer{
    45. Trailer: stream.Trailer(),
    46. Err: appErr,
    47. })
    48. }
    49. return err
    50. }
    51. ...

    我们终于看到了 handler 对 rpc 的处理:

    至此,server 端我们的目标实现,追踪到了整个请求和监听、handler 处理 和 response 回写的过程。